java调用kafka的API生产者和消费者

您所在的位置:网站首页 kafka调用read api java调用kafka的API生产者和消费者

java调用kafka的API生产者和消费者

2024-07-16 19:35| 来源: 网络整理| 查看: 265

一、 Producer API 1.1消息发送流程

Kafka 的 Producer 发送消息采用的是异步发送的方式。在消息发送的过程中,涉及到了 两个线程——main 线程和 Sender 线程,以及一个线程共享变量——RecordAccumulator。 main 线程将消息发送给 RecordAccumulator,Sender 线程不断从 RecordAccumulator 中拉取消息发送到 Kafka broker。 在这里插入图片描述

1.2 编写代码

导入依赖,这里我导入依赖是对上自己下载的版本

org.apache.kafka kafka-clients 2.3.0

需要用到的类: KafkaProducer:需要创建一个生产者对象,用来发送数据 ProducerConfig:获取所需的一系列配置参数 ProducerRecord:每条数据都要封装成一个 ProducerRecord 对象

import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; public class kafka_demo { public static void main(String[] args){ Properties properties = new Properties(); //broker的地址清单,建议至少填写两个,避免宕机 properties.put("bootstrap.servers", "192.168.xx.xx:9092"); //acks指定必须有多少个分区副本接收消息,生产者才认为消息写入成功,用户检测数据丢失的可能性 //acks=0:生产者在成功写入消息之前不会等待任何来自服务器的响应。无法监控数据是否发送成功,但可以以网络能够支持的最大速度发送消息,达到很高的吞吐量。 //acks=1:只要集群的首领节点收到消息,生产者就会收到来自服务器的成功响应。 //acks=all:只有所有参与复制的节点全部收到消息时,生产者才会收到来自服务器的成功响应。这种模式是最安全的, properties.put("acks", "all"); //retries:生产者从服务器收到的错误有可能是临时性的错误的次数 properties.put("retries", 0); //batch.size:该参数指定了一个批次可以使用的内存大小,按照字节数计算(而不是消息个数) properties.put("batch.size", 16384); //linger.ms:该参数指定了生产者在发送批次之前等待更多消息加入批次的时间,增加延迟,提高吞吐量 properties.put("linger.ms", 1); //buffer.memory该参数用来设置生产者内存缓冲区的大小,生产者用它缓冲要发送到服务器的消息。 properties.put("buffer.memory", 33554432); //key和value的序列化 properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer producer = new KafkaProducer(properties); try { //producer = new KafkaProducer(properties); for (int i = 0; i


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3